const {Kafka} = require('kafkajs')
let producer

const initKafka = async (clientId, brokers) => {
  const kafka = new Kafka({
    clientId: clientId,
    brokers: [brokers]
  })
  producer = kafka.producer()
  await producer.connect()
  console.log('kafka producer 连接成功')
}

/**
 * 异步发送
 */
const sendMessage = async (topic, message) => {
  return producer.send({
    topic: topic,
    messages: [
      {key: topic, value: message} // 这里暂时用topic本身作为消息的key(为了照顾iot的需求)
    ]
  })
}

/**
 * 同步发送
 */
const sendMessageSync = async (topic, message) => {
  await producer.send({
    topic: topic,
    messages: [
      {value: message}
    ]
  })
}

export default {initKafka, sendMessage, sendMessageSync}
